/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.api.service.impl;

import com.chinamobile.cmss.lakehouse.api.dto.DataSourceBean;
import com.chinamobile.cmss.lakehouse.api.service.LinkService;
import com.chinamobile.cmss.lakehouse.common.Constants;
import com.chinamobile.cmss.lakehouse.common.dto.flinkx.FieldConf;
import com.chinamobile.cmss.lakehouse.common.dto.flinkx.FileParam;
import com.chinamobile.cmss.lakehouse.common.enums.DataSourceTypeEnum;
import com.chinamobile.cmss.lakehouse.common.exception.BaseException;
import com.chinamobile.cmss.lakehouse.dao.entity.DataSourceEntity;
import com.chinamobile.cmss.lakehouse.service.hive.HiveAccessService;

import java.util.List;

import javax.annotation.PostConstruct;

import com.alibaba.fastjson.JSONObject;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("linkS3Service")
@Slf4j
public class LinkS3ServiceImpl implements LinkService {
    public static final String S3_SCHEMA = "s3a://";
    public static final String HIVE_S3_TABLE = "CREATE EXTERNAL TABLE IF NOT EXISTS %s ( %s ) STORED AS TEXTFILE location '%s' "
            + "tblproperties ('fs.s3a.endpoint'='%s',"
            + "'fs.s3a.access.key'='%s',"
            + "'fs.s3a.secret.key'='%s')";
    private static final String HIVE_DELIMITER = "`";
    private static final String FIELD_DELIMITER = ",";

    AmazonS3 s3Client;
    @Autowired
    private HiveAccessService hiveAccessService;

    @PostConstruct
    public void init() {
        AWSCredentials credentials = new BasicAWSCredentials(Constants.S3A_ACCESS_KEY, Constants.S3A_SECRET_KEY);
        ClientConfiguration opts = new ClientConfiguration();
        opts.setSignerOverride("S3SignerType");
        s3Client = AmazonS3ClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(credentials))
                .withClientConfiguration(opts)
                .withPathStyleAccessEnabled(true)
                .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(Constants.S3A_ENDPOINT, ""))
                .build();
    }

    @Override
    public void validate(DataSourceBean dataSource) {
        if (StringUtils.isBlank(dataSource.getIp()) && StringUtils.isBlank(dataSource.getUrl())) {
            throw new BaseException("Ip can't be null.");
        }
    }

    @Override
    public String appendUrl(DataSourceBean dataSourceBean) {
        return null;
    }

    @Override
    public Boolean isConnected(DataSourceEntity dataSourceEntity) {
        try {
            s3Client.getS3AccountOwner();
        } catch (Exception e) {
            log.error("Connect to s3 err.", e);
            return Boolean.FALSE;
        }

        return Boolean.TRUE;
    }

    @Override
    public String getTablePathSchema(DataSourceTypeEnum dataSourceType, String param) {
        FileParam fileParam = JSONObject.parseObject(param, FileParam.class);
        return fileParam.getDataBase();
    }

    @Override
    public Boolean checkTablePathExist(DataSourceEntity dataSourceEntity, String schema, String tablePath, String userName) {
        try {
            return hiveAccessService.isTableExist(schema, tablePath, userName);
        } catch (Exception e) {
            log.warn("Table [{}] not exist [{}]", tablePath, e.getMessage());
            return Boolean.FALSE;
        }
    }

    @Override
    public Boolean createNewTablePath(DataSourceEntity dataSourceEntity, String schema, String table, String userName, String param, List<FieldConf> metaColumns) {
        try {
            FileParam fileParam = JSONObject.parseObject(param, FileParam.class);

            StringBuilder sb = new StringBuilder();
            metaColumns.forEach(metaColumn -> {
                sb.append(HIVE_DELIMITER).append(metaColumn.getName()).append(HIVE_DELIMITER)
                        .append(" ").append(metaColumn.getType()).append(",");
            });
            sb.setLength(sb.length() - 1);
            String dbTable = schema + "." + table;
            String createHiveTableSql = String.format(HIVE_S3_TABLE, dbTable, sb, fileParam.getPath(), Constants.S3A_ENDPOINT,
                    Constants.S3A_ACCESS_KEY, Constants.S3A_SECRET_KEY);

            return hiveAccessService.executeSql(userName, createHiveTableSql);
        } catch (Exception e) {
            log.warn("Table [{}] create err [{}]", table, e.getMessage());
            throw new RuntimeException(e);
        }
    }
}
